# Authors:
#   Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2013  Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import print_function

import pytest

from ipaplatform.paths import paths
from ipapython.dn import DN
from ipaserver.install.replication import EXCLUDES
from ipatests.pytest_ipa.integration import tasks
from ipatests.test_integration.base import IntegrationTest
from ipatests.test_integration.test_topology import find_segment


def check_replication(source_host, dest_host, login):
    source_host.run_command([
        "ipa", "user-add", login,
        "--first", "test",
        "--last", "user"
    ])

    source_ldap = source_host.ldap_connect()
    tasks.wait_for_replication(source_ldap)

    ldap = dest_host.ldap_connect()
    tasks.wait_for_replication(ldap)

    # Check using LDAP
    basedn = dest_host.domain.basedn
    user_dn = DN(
        ("uid", login), ("cn", "users"),
        ("cn", "accounts"), basedn
    )
    entry = ldap.get_entry(user_dn)
    assert entry.dn == user_dn
    assert entry["uid"] == [login]

    # Check using CLI
    result = dest_host.run_command(['ipa', 'user-show', login])
    assert "User login: {}".format(login) in result.stdout_text


@pytest.mark.ds_acceptance
class TestSimpleReplication(IntegrationTest):
    """Simple replication test

    Install a server and a replica, then add an user on one host and ensure
    it is also present on the other one.
    """
    num_replicas = 1
    topology = 'star'

    def test_user_replication_to_replica(self):
        """Test user replication master -> replica"""
        check_replication(self.master, self.replicas[0], 'testuser1')

    def test_user_replication_to_master(self):
        """Test user replication replica -> master"""
        check_replication(self.replicas[0], self.master, 'testuser2')

    def test_replica_manage(self):
        """Test ipa-replica-manage list

        Ensure that ipa-replica-manage list -v <node> does not print
        last init status: None
        last init ended: 1970-01-01 00:00:00+00:00
        when the node never had any total update.
        Test for ticket 7716.
        """
        msg1 = "last init ended: 1970-01-01 00:00:00+00:00"
        msg2 = "last init status: None"
        result = self.master.run_command(
            ["ipa-replica-manage", "list", "-v", self.replicas[0].hostname])
        assert msg1 not in result.stdout_text
        assert msg2 not in result.stdout_text

        result = self.master.run_command(
            ["ipa-replica-manage", "list", "-v", self.replicas[0].hostname],
            stdin_text=self.master.config.dirman_password)
        assert msg1 not in result.stdout_text
        assert msg2 not in result.stdout_text

    def test_ipa_custodia_check(self):
        replica = self.replicas[0]
        self.master.run_command(
            [paths.IPA_CUSTODIA_CHECK, replica.hostname]
        )
        replica.run_command(
            [paths.IPA_CUSTODIA_CHECK, self.master.hostname]
        )

    def test_fix_agreements(self):
        """Test that upgrade fixes the list of attributes excluded from repl

        Test for ticket 9385
        """
        # Prepare the server by removing some values from
        # from the nsDS5ReplicatedAttributeList
        segment = find_segment(self.master, self.replicas[0], "domain")
        self.master.run_command([
            "ipa", "topologysegment-mod", "domain", segment,
            "--replattrs",
            "(objectclass=*) $ EXCLUDE memberof idnssoaserial entryusn"])
        # Run the upgrade
        result = self.master.run_command(["ipa-server-upgrade"])
        # Ensure that the upgrade updated the attribute without error
        errmsg = "Error caught updating nsDS5ReplicatedAttributeList"
        assert errmsg not in result.stdout_text
        # Check the updated value
        suffix = DN(self.master.domain.basedn)
        dn = DN(('cn', str(suffix)), ('cn', 'mapping tree'), ('cn', 'config'))
        result = tasks.ldapsearch_dm(self.master, str(dn),
                                     ["nsDS5ReplicatedAttributeList"])
        output = result.stdout_text.lower()

        template = 'nsDS5ReplicatedAttributeList: (objectclass=*) $ EXCLUDE %s'
        expected_value = template % " ".join(EXCLUDES)
        assert expected_value.lower() in output

    def test_replica_removal(self):
        """Test replica removal"""
        result = self.master.run_command(['ipa-replica-manage', 'list'])
        assert self.replicas[0].hostname in result.stdout_text
        # has to be run with --force, there is no --unattended
        self.master.run_command(['ipa-replica-manage', 'del',
                                 self.replicas[0].hostname, '--force'])
        result = self.master.run_command(
            ['ipa-replica-manage', 'list', '-v', self.master.hostname])
        assert self.replicas[0].hostname not in result.stdout_text
